package com.bunnymaicai.boot.kafka.autoconfigure;

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Properties;
import java.util.concurrent.Future;

/**
 *
 * @author youshang
 * @date 2021/03/01 11:36
 **/
@Log4j2
@Service
public class KafkaTemplateImpl implements KafkaTemplate {
    @Autowired
    private KafkaConfiguration kafkaConfiguration;
    /**
     * 发送同步消息
     * @param topic
     * @param key
     * @param value
     * @return
     */
    @Override
    public RecordMetadata send(String topic, Object key, Object value) {
        Producer producer = this.producer();
        try {
            ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(topic, key, value);
            Future<RecordMetadata> send = producer.send(producerRecord);
            return send.get();
        }catch (Exception e){
            log.error("发送指定分区异步消息-报错异常："+e);
            e.printStackTrace();
        }finally {
            producer.close();
        }
        return null;
    }

    /**
     * 指定分区发送同步消息
     * @param topic
     * @param partition
     * @param key
     * @param value
     * @return
     */
    @Override
    public RecordMetadata send(String topic, Integer partition, Object key, Object value) {
        Producer producer = this.producer();
        try {
            ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, value);
            Future<RecordMetadata> future = producer.send(producerRecord);
            return future.get();
        }catch (Exception e){
            log.error("发送指定分区异步消息-报错异常："+e);
            e.printStackTrace();
        }finally {
            producer.close();
        }
        return null;
    }

    @Override
    public void send(String topic, Integer partition, Object key, Object value, Callback callback) {
        Producer producer = this.producer();
        try {
            ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(topic, partition, key, value);
            producer.send(producerRecord,callback);
        }catch (Exception e){
            log.error("发送指定分区异步消息-报错异常："+e);
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }

    /**
     * 异步发送kafka消息
     * @param topic
     * @param key
     * @param value
     * @param callback
     */
    @Override
    public void send(String topic, Object key, Object value, Callback callback) {
        Producer producer = this.producer();
        try {
            ProducerRecord<Object, Object> producerRecord = new ProducerRecord<Object, Object>(topic, key, value);
            producer.send(producerRecord,callback);
        }catch (Exception e){
            log.error("发送异步消息-报错异常："+e);
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }

    private  Producer producer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConfiguration.getBootstrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,kafkaConfiguration.getProducer().getKeySerializer());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,kafkaConfiguration.getProducer().getValueSerializer());
        org.apache.kafka.clients.producer.Producer<Object, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
        return producer;
    }
}
